Skip to content

Commit

Permalink
rewrite receipt processor
Browse files Browse the repository at this point in the history
  • Loading branch information
Theodus committed Aug 13, 2024
1 parent ffce0f7 commit c5aacb8
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 102 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ chrono = { version = "0.4.34", default-features = false, features = [
"serde",
] }
ethers = "2.0.14"
flate2 = "1.0.31"
rdkafka = { version = "0.36.0", features = [
"cmake-build",
"gssapi",
Expand Down
246 changes: 144 additions & 102 deletions src/receipts.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,26 @@
use std::{collections::HashMap, fs::File, io::Write as _, path::PathBuf};
use std::{collections::BTreeMap, fs::File, io::Write as _, path::PathBuf};

use anyhow::Context as _;
use chrono::{serde::ts_milliseconds, DateTime, Datelike, Duration, Utc};
use chrono::{serde::ts_milliseconds, DateTime, Duration, Utc};
use ethers::providers::StreamExt;
use flate2::{read::GzDecoder, write::GzEncoder};
use rdkafka::{
consumer::{Consumer, StreamConsumer},
Message,
};
use serde::Deserialize;
use thegraph_core::types::alloy_primitives::Address;
use tokio::sync::watch;
use tokio::sync::{mpsc, watch};

use crate::config;

pub async fn track_receipts(
config: &config::Kafka,
graph_env: String,
) -> anyhow::Result<watch::Receiver<HashMap<Address, u128>>> {
) -> anyhow::Result<watch::Receiver<BTreeMap<Address, u128>>> {
let window = Duration::days(28);
let db = DB::new(config.cache.clone(), window).context("failed to init DB")?;
let latest_timestamp = db.last_flush;
tracing::debug!(?latest_timestamp);
let (tx, rx) = watch::channel(Default::default());
let db = DB::spawn(config.cache.clone(), window, tx).context("failed to init DB")?;

let mut client_config = rdkafka::ClientConfig::new();
client_config.extend(config.config.clone().into_iter());
Expand All @@ -36,9 +37,8 @@ pub async fn track_receipts(
let mut consumer: StreamConsumer = client_config.create()?;
consumer.subscribe(&[&config.topic])?;

let (tx, rx) = watch::channel(Default::default());
tokio::spawn(async move {
if let Err(kafka_consumer_err) = process_messages(&mut consumer, db, tx, graph_env).await {
if let Err(kafka_consumer_err) = process_messages(&mut consumer, db, graph_env).await {
tracing::error!(%kafka_consumer_err);
}
});
Expand All @@ -48,133 +48,175 @@ pub async fn track_receipts(

async fn process_messages(
consumer: &mut StreamConsumer,
mut db: DB,
tx: watch::Sender<HashMap<Address, u128>>,
db: mpsc::Sender<Update>,
graph_env: String,
) -> anyhow::Result<()> {
let mut rate_count: u64 = 0;
let mut latest_msg = Utc::now();
loop {
let msg = match consumer.recv().await {
Ok(msg) => msg,
Err(recv_error) => {
tracing::error!(%recv_error);
continue;
consumer
.stream()
.for_each_concurrent(16, |msg| async {
let msg = match msg {
Ok(msg) => msg,
Err(recv_error) => {
tracing::error!(%recv_error);
return;
}
};
let payload = match msg.payload() {
Some(payload) => payload,
None => return,
};
#[derive(Deserialize)]
struct Payload {
#[serde(with = "ts_milliseconds")]
timestamp: DateTime<Utc>,
graph_env: String,
indexer: Address,
fee: f64,
#[serde(default)]
legacy_scalar: bool,
}
};
let payload = match msg.payload() {
Some(payload) => payload,
None => continue,
};

rate_count += 1;
if Utc::now().signed_duration_since(db.last_flush) > Duration::seconds(30) {
let msg_hz = rate_count / 30;
rate_count = 0;
tracing::info!(?latest_msg, msg_hz, "flush");
db.flush()?;
tx.send(db.total_fees())?;
}

#[derive(Deserialize)]
struct Payload {
#[serde(with = "ts_milliseconds")]
timestamp: DateTime<Utc>,
graph_env: String,
indexer: Address,
fee: f64,
#[serde(default)]
legacy_scalar: bool,
}
let payload: Payload = serde_json::from_reader(payload)?;
latest_msg = payload.timestamp.max(latest_msg);
if payload.legacy_scalar || (payload.graph_env != graph_env) {
continue;
}
let payload: Payload = match serde_json::from_reader(payload) {
Ok(payload) => payload,
Err(payload_parse_err) => {
tracing::error!(%payload_parse_err, input = %String::from_utf8_lossy(payload));
return;
}
};
if payload.legacy_scalar || (payload.graph_env != graph_env) {
return;
}
let update = Update {
timestamp: payload.timestamp,
indexer: payload.indexer,
fee: (payload.fee * 1e18) as u128,
};
let _ = db.send(update).await;
})
.await;
Ok(())
}

let fees = (payload.fee * 1e18) as u128;
db.update(payload.indexer, payload.timestamp, fees);
}
struct Update {
timestamp: DateTime<Utc>,
indexer: Address,
fee: u128,
}

struct DB {
data: HashMap<Address, Vec<u128>>,
// indexer debts, aggregated per hour
data: BTreeMap<Address, BTreeMap<i64, u128>>,
file: PathBuf,
window: Duration,
last_flush: DateTime<Utc>,
tx: watch::Sender<BTreeMap<Address, u128>>,
}

impl DB {
fn new(file: PathBuf, window: Duration) -> anyhow::Result<Self> {
pub fn spawn(
file: PathBuf,
window: Duration,
tx: watch::Sender<BTreeMap<Address, u128>>,
) -> anyhow::Result<mpsc::Sender<Update>> {
let cache = File::options()
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(&file)
.context("open cache file")?;
let modified: DateTime<Utc> = DateTime::from(cache.metadata()?.modified()?);
let mut data: HashMap<Address, Vec<u128>> =
serde_json::from_reader(&cache).unwrap_or_default();
drop(cache);
assert!(data.values().all(|v| v.len() == window.num_days() as usize));
let now = Utc::now();
let offset: usize = (now - modified)
.num_days()
.min(window.num_days())
.try_into()
.unwrap_or(0);
for bins in data.values_mut() {
bins.rotate_right(offset);
for entry in bins.iter_mut().take(offset) {
*entry = 0;
let data = match serde_json::from_reader(GzDecoder::new(&cache)) {
Ok(data) => data,
Err(cache_err) => {
tracing::error!(%cache_err);
Default::default()
}
}
Ok(Self {
};
drop(cache);
let mut db = DB {
data,
file,
window,
last_flush: now,
})
}

fn flush(&mut self) -> anyhow::Result<()> {
let now = Utc::now();
if self.last_flush.day() != now.day() {
for bins in self.data.values_mut() {
bins.rotate_right(1);
bins[0] = 0;
tx,
};
db.prune(Utc::now());

let (tx, mut rx) = mpsc::channel(128);
tokio::spawn(async move {
let mut last_flush = Utc::now();
let mut last_snapshot = Utc::now();
let mut message_total: usize = 0;
let buffer_size = 128;
let mut buffer: Vec<Update> = Vec::with_capacity(buffer_size);
loop {
rx.recv_many(&mut buffer, buffer_size).await;
let now = Utc::now();
message_total += buffer.len();
for update in buffer.drain(..) {
db.update(update, now);
}

if (now - last_snapshot) >= Duration::seconds(1) {
let _ = db.tx.send(db.snapshot());
last_snapshot = now;
}
if (now - last_flush) >= Duration::minutes(1) {
db.prune(now);
if let Err(flush_err) = db.flush() {
tracing::error!(%flush_err);
};
let debts: BTreeMap<Address, f64> = db
.snapshot()
.into_iter()
.map(|(k, v)| (k, v as f64 * 1e-18))
.collect();
let update_hz = message_total / (now - last_flush).num_seconds() as usize;
tracing::info!(update_hz, ?debts);
message_total = 0;
last_flush = now;
}
}
}
});

let mut file = File::create(&self.file)?;
serde_json::to_writer(&file, &self.data)?;
file.flush()?;

self.last_flush = now;
Ok(())
Ok(tx)
}

fn update(&mut self, indexer: Address, timestamp: DateTime<Utc>, fees: u128) {
let now = Utc::now();
if timestamp < (now - self.window) {
tracing::warn!(
"discaring update outside window, try moving up consumer group partition offsets"
);
fn update(&mut self, update: Update, now: DateTime<Utc>) {
if update.timestamp < (now - self.window) {
return;
}
let daily_fees = self
let entry = self
.data
.entry(indexer)
.or_insert_with(|| Vec::from_iter((0..self.window.num_days()).map(|_| 0)));
let offset: usize = (now - timestamp).num_days().try_into().unwrap_or(0);
daily_fees[offset] += fees;
.entry(update.indexer)
.or_default()
.entry(hourly_timestamp(update.timestamp))
.or_default();
*entry += update.fee;
}

fn prune(&mut self, now: DateTime<Utc>) {
let min_timestamp = hourly_timestamp(now - self.window);
self.data.retain(|_, entries| {
entries.retain(|t, _| *t > min_timestamp);
!entries.is_empty()
});
}

fn total_fees(&self) -> HashMap<Address, u128> {
fn flush(&mut self) -> anyhow::Result<()> {
let mut file = File::create(&self.file)?;
let gz = GzEncoder::new(file.try_clone().unwrap(), flate2::Compression::new(4));
serde_json::to_writer(gz, &self.data)?;
file.flush()?;
Ok(())
}

fn snapshot(&self) -> BTreeMap<Address, u128> {
self.data
.iter()
.map(|(indexer, daily_fees)| (*indexer, daily_fees.iter().sum()))
.map(|(indexer, entries)| (*indexer, entries.values().sum()))
.collect()
}
}

fn hourly_timestamp(t: DateTime<Utc>) -> i64 {
let t = t.timestamp();
t - (t % Duration::hours(1).num_seconds())
}

0 comments on commit c5aacb8

Please sign in to comment.