Skip to content

Commit

Permalink
fix: audit issues (#583)
Browse files Browse the repository at this point in the history
* fix: audit issues

* chore: update flake deps

* fix: update tracing to support new otel api

* fix: update tracer service name
  • Loading branch information
dolcalmi authored Nov 26, 2024
1 parent 1d61fbf commit 7d29a10
Show file tree
Hide file tree
Showing 10 changed files with 1,310 additions and 1,019 deletions.
2,170 changes: 1,221 additions & 949 deletions Cargo.lock

Large diffs are not rendered by default.

41 changes: 30 additions & 11 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,36 @@ edition = "2021"
fail-on-warnings = []

[dependencies]
sqlx-ledger = { version = "0.11.4", features = ["otel"] }
sqlx-ledger = { version = "0.11.5", features = ["otel"] }

anyhow = "1.0.82"
bitcoincore-rpc = "0.17.0"
clap = { version = "4.5", features = ["derive", "env"] }
chrono = { version = "0.4.38", features = ["clock", "serde"], default-features = false }
chrono = { version = "0.4.38", features = [
"clock",
"serde",
], default-features = false }
derive_builder = "0.20.0"
serde = { version = "1.0.203", features = ["derive"] }
serde_json = "1.0.117"
serde_yaml = "0.9.32"
sqlx = { version = "0.7.4", features = ["runtime-tokio-rustls", "postgres", "rust_decimal", "uuid", "chrono"] }
sqlxmq = { version = "0.5", default-features = false, features = ["runtime-tokio-rustls"] }
sqlx = { version = "0.8.2", features = [
"runtime-tokio-rustls",
"postgres",
"rust_decimal",
"uuid",
"chrono",
] }
sqlxmq = { git = "https://github.com/HyperparamAI/sqlxmq", rev = "52c3daf6af55416aefa4b1114e108f968f6c57d4", default-features = false, features = [
"runtime-tokio-rustls",
] }
tokio = { version = "1.37", features = ["rt-multi-thread", "macros"] }
tokio-stream = { version = "0.1.15", features = ["sync"] }
tonic = "0.11.0"
tonic-health = "0.11.0"
rust_decimal = "1.35"
prost = "0.12"
prost-wkt-types = { version = "0.5", features = ["vendored-protoc"]}
prost-wkt-types = { version = "0.5", features = ["vendored-protoc"] }
rust_decimal_macros = "1.34"
rusty-money = "0.4.1"
thiserror = "1.0.61"
Expand All @@ -35,15 +46,21 @@ futures = "0.3.30"
url = "2.5.2"
rand = "0.8.5"
bdk = "0.29.0"
opentelemetry = { version = "0.23.0" }
opentelemetry_sdk = { version = "0.23.0", features = ["rt-tokio"] }
opentelemetry-otlp = { version = "0.16.0", features = ["http-proto", "reqwest-client"] }
opentelemetry = { version = "0.27.0" }
opentelemetry_sdk = { version = "0.27.0", features = ["rt-tokio"] }
opentelemetry-otlp = { version = "0.27.0", features = [
"http-proto",
"reqwest-client",
] }
tracing = "0.1.40"
tracing-opentelemetry = "0.24.0"
tracing-opentelemetry = "0.28.0"
tracing-subscriber = { version = "0.3.18", features = ["env-filter", "json"] }
serde_with = "3.8.1"
electrum-client = "0.18.0"
reqwest = { version = "0.12.5", default-features = false, features = ["json", "rustls-tls"] }
reqwest = { version = "0.12.5", default-features = false, features = [
"json",
"rustls-tls",
] }
async-trait = "0.1.80"
base64 = "0.22.1"
tempfile = "3.10.1"
Expand All @@ -53,7 +70,9 @@ regex = "1.10.4"
miniscript = "10.0"
reqwest-retry = "0.5.0"
reqwest-middleware = "0.3"
tonic_lnd = { version = "0.2.0", package="fedimint-tonic-lnd", features = ["lightningrpc"] }
tonic_lnd = { version = "0.2.0", package = "fedimint-tonic-lnd", features = [
"lightningrpc",
] }

[dev-dependencies]
serial_test = "*"
Expand Down
21 changes: 9 additions & 12 deletions flake.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/fees/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ impl FeesClient {
};
tracing::Span::current().record(
"fee_rate",
&tracing::field::display(format!("{:?}", fee_rate)),
tracing::field::display(format!("{:?}", fee_rate)),
);
Ok(fee_rate)
}
Expand Down
2 changes: 1 addition & 1 deletion src/job/batch_broadcasting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub async fn execute(
let blockchain = init_electrum(&blockchain_cfg.electrum_url).await?;
let batch = batches.find_by_id(data.account_id, data.batch_id).await?;
let span = tracing::Span::current();
span.record("txid", &tracing::field::display(batch.bitcoin_tx_id));
span.record("txid", tracing::field::display(batch.bitcoin_tx_id));
if batch.accounting_complete() {
if let Some(tx) = batch.signed_tx {
blockchain.broadcast(&tx).map_err(BdkError::BdkLibError)?;
Expand Down
6 changes: 3 additions & 3 deletions src/job/batch_signing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub async fn execute(
let mut new_sessions = HashMap::new();
let mut account_xpubs = HashMap::new();
let batch = batches.find_by_id(data.account_id, data.batch_id).await?;
span.record("tx_id", &tracing::field::display(batch.bitcoin_tx_id));
span.record("tx_id", tracing::field::display(batch.bitcoin_tx_id));
let unsigned_psbt = batch.unsigned_psbt;
for (wallet_id, summary) in batch.wallet_summaries {
let wallet = wallets.find_by_id(wallet_id).await?;
Expand Down Expand Up @@ -132,7 +132,7 @@ pub async fn execute(
}
let mut sessions = sessions.into_values();

span.record("stalled", &tracing::field::display(stalled));
span.record("stalled", tracing::field::display(stalled));
if let Some(mut first_signed_psbt) = sessions.find_map(|s| s.signed_psbt().cloned()) {
for s in sessions {
if let Some(psbt) = s.signed_psbt() {
Expand All @@ -141,7 +141,7 @@ pub async fn execute(
}
if current_keychain.is_none() {
let batch = batches.find_by_id(data.account_id, data.batch_id).await?;
span.record("tx_id", &tracing::field::display(batch.bitcoin_tx_id));
span.record("tx_id", tracing::field::display(batch.bitcoin_tx_id));
let wallet_id = batch.wallet_summaries.into_keys().next().unwrap();
let wallet = wallets.find_by_id(wallet_id).await?;
current_keychain = Some(wallet.current_keychain_wallet(&pool));
Expand Down
23 changes: 10 additions & 13 deletions src/job/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,17 +100,14 @@ impl<'a> JobExecutor<'a> {
}

async fn handle_error<E: JobExecutionError>(&mut self, meta: JobMeta, error: &E) {
Span::current().record("error", &tracing::field::display("true"));
Span::current().record("error.message", &tracing::field::display(&error));
Span::current().record("error", tracing::field::display("true"));
Span::current().record("error.message", tracing::field::display(&error));
if meta.attempts <= self.warn_retries {
Span::current().record(
"error.level",
&tracing::field::display(tracing::Level::WARN),
);
Span::current().record("error.level", tracing::field::display(tracing::Level::WARN));
} else {
Span::current().record(
"error.level",
&tracing::field::display(tracing::Level::ERROR),
tracing::field::display(tracing::Level::ERROR),
);
}
}
Expand All @@ -133,12 +130,12 @@ impl<'a> JobExecutor<'a> {
data.job_meta.attempts += 1;
data.job_meta.tracing_data = Some(extract_tracing_data());

span.record("job_id", &tracing::field::display(self.job.id()));
span.record("job_name", &tracing::field::display(self.job.name()));
span.record("attempt", &tracing::field::display(data.job_meta.attempts));
span.record("job_id", tracing::field::display(self.job.id()));
span.record("job_name", tracing::field::display(self.job.name()));
span.record("attempt", tracing::field::display(data.job_meta.attempts));
span.record(
"checkpoint_json",
&tracing::field::display(serde_json::to_string(&data).expect("Couldn't checkpoint")),
tracing::field::display(serde_json::to_string(&data).expect("Couldn't checkpoint")),
);

let mut checkpoint =
Expand Down Expand Up @@ -169,11 +166,11 @@ impl<'a> JobExecutor<'a> {
.await?;

if data.job_meta.attempts >= self.max_attempts {
span.record("last_attempt", &tracing::field::display(true));
span.record("last_attempt", tracing::field::display(true));
self.job.complete().await?;
Ok(true)
} else {
span.record("last_attempt", &tracing::field::display(false));
span.record("last_attempt", tracing::field::display(false));
Ok(false)
}
}
Expand Down
14 changes: 7 additions & 7 deletions src/job/process_payout_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,23 +83,23 @@ pub(super) async fn execute<'a>(

let span = tracing::Span::current();
if let (Some(tx_id), Some(psbt)) = (tx_id, psbt) {
span.record("tx_id", &tracing::field::display(tx_id));
span.record("psbt", &tracing::field::display(&psbt));
span.record("tx_id", tracing::field::display(tx_id));
span.record("psbt", tracing::field::display(&psbt));

let wallet_ids = wallet_totals.keys().copied().collect();
span.record("batch_id", &tracing::field::display(data.batch_id));
span.record("total_fee_sats", &tracing::field::display(fee_satoshis));
span.record("batch_id", tracing::field::display(data.batch_id));
span.record("total_fee_sats", tracing::field::display(fee_satoshis));
span.record(
"total_change_sats",
&tracing::field::display(
tracing::field::display(
wallet_totals
.values()
.fold(Satoshis::ZERO, |acc, v| acc + v.change_satoshis),
),
);
span.record(
"cpfp_fee_sats",
&tracing::field::display(
tracing::field::display(
wallet_totals
.values()
.fold(Satoshis::ZERO, |acc, v| acc + v.cpfp_fee_satoshis),
Expand Down Expand Up @@ -188,7 +188,7 @@ pub async fn construct_psbt(
..
} = payout_queue;
span.record("payout_queue_name", queue_name);
span.record("payout_queue_id", &tracing::field::display(queue_id));
span.record("payout_queue_id", tracing::field::display(queue_id));
span.record("n_unbatched_payouts", unbatched_payouts.n_payouts());

let wallets = wallets.find_by_ids(unbatched_payouts.wallet_ids()).await?;
Expand Down
4 changes: 2 additions & 2 deletions src/outbox/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,15 @@ impl Outbox {

current_span.record(
"ledger_event",
&tracing::field::display(
tracing::field::display(
serde_json::to_string(&ledger_event).expect("Couldn't serialize JournalEvent"),
),
);
let payloads = Vec::<OutboxEventPayload>::from(ledger_event.metadata);
let sequences = self.sequences_for(ledger_event.account_id).await?;
let mut write_sequences = sequences.write().await;
let mut sequence = write_sequences.0;
current_span.record("next_sequence", &tracing::field::display(sequence));
current_span.record("next_sequence", tracing::field::display(sequence));
let events: Vec<OutboxEvent<_>> = payloads
.into_iter()
.map(|payload| {
Expand Down
46 changes: 26 additions & 20 deletions src/tracing.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use opentelemetry::trace::TracerProvider as _;
use opentelemetry::{propagation::TextMapPropagator, KeyValue};
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::{propagation::TraceContextPropagator, trace::Sampler};
use opentelemetry_sdk::trace::{Config, Sampler, TracerProvider};
use opentelemetry_sdk::{propagation::TraceContextPropagator, Resource};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use tracing::Span;
Expand Down Expand Up @@ -30,23 +32,27 @@ impl Default for TracingConfig {

pub fn init_tracer(config: TracingConfig) -> anyhow::Result<()> {
let tracing_endpoint = format!("http://{}:{}", config.host, config.port);
let service_name = config.service_name;
println!("Sending traces to {tracing_endpoint}");
let tracer = opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(
opentelemetry_otlp::new_exporter()
.tonic()
.with_endpoint(tracing_endpoint),
)
.with_trace_config(
opentelemetry_sdk::trace::config()
.with_sampler(Sampler::AlwaysOn)
.with_resource(opentelemetry_sdk::Resource::new(vec![KeyValue::new(
"service.name",
config.service_name,
)])),
)
.install_batch(opentelemetry_sdk::runtime::Tokio)?;

let exporter = opentelemetry_otlp::SpanExporter::builder()
.with_tonic()
.with_endpoint(tracing_endpoint)
.build()?;

let provider_config = Config::default()
.with_sampler(Sampler::AlwaysOn)
.with_resource(Resource::new(vec![KeyValue::new(
"service.name",
service_name.clone(),
)]));

let provider = TracerProvider::builder()
.with_batch_exporter(exporter, opentelemetry_sdk::runtime::Tokio)
.with_config(provider_config)
.build();
let tracer = provider.tracer(service_name);

let telemetry = tracing_opentelemetry::layer().with_tracer(tracer);

let fmt_layer = fmt::layer().json();
Expand Down Expand Up @@ -77,9 +83,9 @@ pub fn inject_tracing_data(span: &Span, tracing_data: &HashMap<String, String>)
}

pub fn insert_error_fields(level: tracing::Level, error: impl std::fmt::Display) {
Span::current().record("error", &tracing::field::display("true"));
Span::current().record("error.level", &tracing::field::display(level));
Span::current().record("error.message", &tracing::field::display(error));
Span::current().record("error", tracing::field::display("true"));
Span::current().record("error.level", tracing::field::display(level));
Span::current().record("error.message", tracing::field::display(error));
}

pub async fn record_error<
Expand Down

0 comments on commit 7d29a10

Please sign in to comment.