diff --git a/Cargo.lock b/Cargo.lock index 36b7142..70f17b9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3926,6 +3926,7 @@ dependencies = [ "anyhow", "chrono", "ethers", + "flate2", "rdkafka", "reqwest 0.11.27", "reqwest 0.12.5", diff --git a/Cargo.toml b/Cargo.toml index 72759d1..4f8ff1a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,7 @@ chrono = { version = "0.4.34", default-features = false, features = [ "serde", ] } ethers = "2.0.14" +flate2 = "1.0.31" rdkafka = { version = "0.36.0", features = [ "cmake-build", "gssapi", diff --git a/src/receipts.rs b/src/receipts.rs index f39229c..7c149b0 100644 --- a/src/receipts.rs +++ b/src/receipts.rs @@ -1,25 +1,26 @@ -use std::{collections::HashMap, fs::File, io::Write as _, path::PathBuf}; +use std::{collections::BTreeMap, fs::File, io::Write as _, path::PathBuf}; use anyhow::Context as _; -use chrono::{serde::ts_milliseconds, DateTime, Datelike, Duration, Utc}; +use chrono::{serde::ts_milliseconds, DateTime, Duration, Utc}; +use ethers::providers::StreamExt; +use flate2::{read::GzDecoder, write::GzEncoder}; use rdkafka::{ consumer::{Consumer, StreamConsumer}, Message, }; use serde::Deserialize; use thegraph_core::types::alloy_primitives::Address; -use tokio::sync::watch; +use tokio::sync::{mpsc, watch}; use crate::config; pub async fn track_receipts( config: &config::Kafka, graph_env: String, -) -> anyhow::Result>> { +) -> anyhow::Result>> { let window = Duration::days(28); - let db = DB::new(config.cache.clone(), window).context("failed to init DB")?; - let latest_timestamp = db.last_flush; - tracing::debug!(?latest_timestamp); + let (tx, rx) = watch::channel(Default::default()); + let db = DB::spawn(config.cache.clone(), window, tx).context("failed to init DB")?; let mut client_config = rdkafka::ClientConfig::new(); client_config.extend(config.config.clone().into_iter()); @@ -36,9 +37,8 @@ pub async fn track_receipts( let mut consumer: StreamConsumer = client_config.create()?; consumer.subscribe(&[&config.topic])?; - let (tx, rx) = watch::channel(Default::default()); tokio::spawn(async move { - if let Err(kafka_consumer_err) = process_messages(&mut consumer, db, tx, graph_env).await { + if let Err(kafka_consumer_err) = process_messages(&mut consumer, db, graph_env).await { tracing::error!(%kafka_consumer_err); } }); @@ -48,64 +48,74 @@ pub async fn track_receipts( async fn process_messages( consumer: &mut StreamConsumer, - mut db: DB, - tx: watch::Sender>, + db: mpsc::Sender, graph_env: String, ) -> anyhow::Result<()> { - let mut rate_count: u64 = 0; - let mut latest_msg = Utc::now(); - loop { - let msg = match consumer.recv().await { - Ok(msg) => msg, - Err(recv_error) => { - tracing::error!(%recv_error); - continue; + consumer + .stream() + .for_each_concurrent(16, |msg| async { + let msg = match msg { + Ok(msg) => msg, + Err(recv_error) => { + tracing::error!(%recv_error); + return; + } + }; + let payload = match msg.payload() { + Some(payload) => payload, + None => return, + }; + #[derive(Deserialize)] + struct Payload { + #[serde(with = "ts_milliseconds")] + timestamp: DateTime, + graph_env: String, + indexer: Address, + fee: f64, + #[serde(default)] + legacy_scalar: bool, } - }; - let payload = match msg.payload() { - Some(payload) => payload, - None => continue, - }; - - rate_count += 1; - if Utc::now().signed_duration_since(db.last_flush) > Duration::seconds(30) { - let msg_hz = rate_count / 30; - rate_count = 0; - tracing::info!(?latest_msg, msg_hz, "flush"); - db.flush()?; - tx.send(db.total_fees())?; - } - - #[derive(Deserialize)] - struct Payload { - #[serde(with = "ts_milliseconds")] - timestamp: DateTime, - graph_env: String, - indexer: Address, - fee: f64, - #[serde(default)] - legacy_scalar: bool, - } - let payload: Payload = serde_json::from_reader(payload)?; - latest_msg = payload.timestamp.max(latest_msg); - if payload.legacy_scalar || (payload.graph_env != graph_env) { - continue; - } + let payload: Payload = match serde_json::from_reader(payload) { + Ok(payload) => payload, + Err(payload_parse_err) => { + tracing::error!(%payload_parse_err, input = %String::from_utf8_lossy(payload)); + return; + } + }; + if payload.legacy_scalar || (payload.graph_env != graph_env) { + return; + } + let update = Update { + timestamp: payload.timestamp, + indexer: payload.indexer, + fee: (payload.fee * 1e18) as u128, + }; + let _ = db.send(update).await; + }) + .await; + Ok(()) +} - let fees = (payload.fee * 1e18) as u128; - db.update(payload.indexer, payload.timestamp, fees); - } +struct Update { + timestamp: DateTime, + indexer: Address, + fee: u128, } struct DB { - data: HashMap>, + // indexer debts, aggregated per hour + data: BTreeMap>, file: PathBuf, window: Duration, - last_flush: DateTime, + tx: watch::Sender>, } impl DB { - fn new(file: PathBuf, window: Duration) -> anyhow::Result { + pub fn spawn( + file: PathBuf, + window: Duration, + tx: watch::Sender>, + ) -> anyhow::Result> { let cache = File::options() .read(true) .write(true) @@ -113,68 +123,100 @@ impl DB { .truncate(false) .open(&file) .context("open cache file")?; - let modified: DateTime = DateTime::from(cache.metadata()?.modified()?); - let mut data: HashMap> = - serde_json::from_reader(&cache).unwrap_or_default(); - drop(cache); - assert!(data.values().all(|v| v.len() == window.num_days() as usize)); - let now = Utc::now(); - let offset: usize = (now - modified) - .num_days() - .min(window.num_days()) - .try_into() - .unwrap_or(0); - for bins in data.values_mut() { - bins.rotate_right(offset); - for entry in bins.iter_mut().take(offset) { - *entry = 0; + let data = match serde_json::from_reader(GzDecoder::new(&cache)) { + Ok(data) => data, + Err(cache_err) => { + tracing::error!(%cache_err); + Default::default() } - } - Ok(Self { + }; + drop(cache); + let mut db = DB { data, file, window, - last_flush: now, - }) - } - - fn flush(&mut self) -> anyhow::Result<()> { - let now = Utc::now(); - if self.last_flush.day() != now.day() { - for bins in self.data.values_mut() { - bins.rotate_right(1); - bins[0] = 0; + tx, + }; + db.prune(Utc::now()); + + let (tx, mut rx) = mpsc::channel(128); + tokio::spawn(async move { + let mut last_flush = Utc::now(); + let mut last_snapshot = Utc::now(); + let mut message_total: usize = 0; + let buffer_size = 128; + let mut buffer: Vec = Vec::with_capacity(buffer_size); + loop { + rx.recv_many(&mut buffer, buffer_size).await; + let now = Utc::now(); + message_total += buffer.len(); + for update in buffer.drain(..) { + db.update(update, now); + } + + if (now - last_snapshot) >= Duration::seconds(1) { + let _ = db.tx.send(db.snapshot()); + last_snapshot = now; + } + if (now - last_flush) >= Duration::minutes(1) { + db.prune(now); + if let Err(flush_err) = db.flush() { + tracing::error!(%flush_err); + }; + let debts: BTreeMap = db + .snapshot() + .into_iter() + .map(|(k, v)| (k, v as f64 * 1e-18)) + .collect(); + let update_hz = message_total / (now - last_flush).num_seconds() as usize; + tracing::info!(update_hz, ?debts); + message_total = 0; + last_flush = now; + } } - } + }); - let mut file = File::create(&self.file)?; - serde_json::to_writer(&file, &self.data)?; - file.flush()?; - - self.last_flush = now; - Ok(()) + Ok(tx) } - fn update(&mut self, indexer: Address, timestamp: DateTime, fees: u128) { - let now = Utc::now(); - if timestamp < (now - self.window) { - tracing::warn!( - "discaring update outside window, try moving up consumer group partition offsets" - ); + fn update(&mut self, update: Update, now: DateTime) { + if update.timestamp < (now - self.window) { return; } - let daily_fees = self + let entry = self .data - .entry(indexer) - .or_insert_with(|| Vec::from_iter((0..self.window.num_days()).map(|_| 0))); - let offset: usize = (now - timestamp).num_days().try_into().unwrap_or(0); - daily_fees[offset] += fees; + .entry(update.indexer) + .or_default() + .entry(hourly_timestamp(update.timestamp)) + .or_default(); + *entry += update.fee; + } + + fn prune(&mut self, now: DateTime) { + let min_timestamp = hourly_timestamp(now - self.window); + self.data.retain(|_, entries| { + entries.retain(|t, _| *t > min_timestamp); + !entries.is_empty() + }); } - fn total_fees(&self) -> HashMap { + fn flush(&mut self) -> anyhow::Result<()> { + let mut file = File::create(&self.file)?; + let gz = GzEncoder::new(file.try_clone().unwrap(), flate2::Compression::new(4)); + serde_json::to_writer(gz, &self.data)?; + file.flush()?; + Ok(()) + } + + fn snapshot(&self) -> BTreeMap { self.data .iter() - .map(|(indexer, daily_fees)| (*indexer, daily_fees.iter().sum())) + .map(|(indexer, entries)| (*indexer, entries.values().sum())) .collect() } } + +fn hourly_timestamp(t: DateTime) -> i64 { + let t = t.timestamp(); + t - (t % Duration::hours(1).num_seconds()) +}