From d1e3eb937b694a74032a9c88b073326dbccc50e6 Mon Sep 17 00:00:00 2001
From: Sergej Sakac <73715684+Szegoo@users.noreply.github.com>
Date: Sat, 10 Feb 2024 17:14:22 +0100
Subject: [PATCH] Consumption processing (#29)
* Consumption processing
* fix tests
* process
* fix
* make clippy happy
---
.github/workflows/tests.yaml | 15 ++++++
Cargo.lock | 11 ++++
Cargo.toml | 1 +
bin/processor/Cargo.toml | 13 +++++
bin/processor/src/main.rs | 84 +++++++++++++++++++++++++++++++
bin/tracker/src/main.rs | 24 ++++++---
config.toml | 3 +-
registry.json | 56 ++++++++++++++++++++-
routes/config.toml | 1 +
routes/mock-out/out/placeholder | 0
routes/src/consumption.rs | 2 +-
routes/src/extend_subscription.rs | 6 ++-
routes/src/register.rs | 8 +--
routes/tests/mock.rs | 2 +-
scripts/init.sh | 9 +---
scripts/process.sh | 3 ++
scripts/reset_env.sh | 0
shared/src/config.rs | 16 +++++-
shared/src/consumption.rs | 73 ++++++++++++++++++++-------
shared/src/lib.rs | 2 +-
types/src/lib.rs | 22 ++++++++
21 files changed, 306 insertions(+), 45 deletions(-)
create mode 100644 bin/processor/Cargo.toml
create mode 100644 bin/processor/src/main.rs
create mode 100644 routes/mock-out/out/placeholder
create mode 100755 scripts/process.sh
mode change 100644 => 100755 scripts/reset_env.sh
diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml
index 87af391..7684cfd 100644
--- a/.github/workflows/tests.yaml
+++ b/.github/workflows/tests.yaml
@@ -73,3 +73,18 @@ jobs:
- name: Ensure the project builds
run: cargo build
+ test:
+ needs: install
+ runs-on: ubuntu-latest
+ steps:
+ - name: Use cashed cargo
+ uses: actions/cache@v3
+ with:
+ path: ~/.cargo
+ key: ${{ runner.os }}-rust-${{ hashFiles('rust-toolchain.toml') }}
+
+ - name: Checkout the source code
+ uses: actions/checkout@v3
+
+ - name: Ensure the project builds
+ run: cargo test -- --test-threads=1
diff --git a/Cargo.lock b/Cargo.lock
index 7edd08f..b0bd3f0 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -3126,6 +3126,17 @@ dependencies = [
"yansi",
]
+[[package]]
+name = "processor"
+version = "0.1.0"
+dependencies = [
+ "env_logger",
+ "log",
+ "polkadot-core-primitives",
+ "shared",
+ "types",
+]
+
[[package]]
name = "psm"
version = "0.1.21"
diff --git a/Cargo.toml b/Cargo.toml
index 4a1bb47..f838aec 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -6,6 +6,7 @@ license = "GPL-3.0-only"
[workspace]
members = [
+ "bin/processor",
"bin/server",
"bin/tracker",
"routes",
diff --git a/bin/processor/Cargo.toml b/bin/processor/Cargo.toml
new file mode 100644
index 0000000..b030fa4
--- /dev/null
+++ b/bin/processor/Cargo.toml
@@ -0,0 +1,13 @@
+[package]
+name = "processor"
+version = "0.1.0"
+edition = "2021"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
+log = "0.4"
+shared = { path = "../../shared" }
+env_logger = "0.10.1"
+polkadot-core-primitives = { git = "https://github.com/paritytech/polkadot-sdk", branch = "release-polkadot-v1.1.0" }
+types = { path = "../../types" }
diff --git a/bin/processor/src/main.rs b/bin/processor/src/main.rs
new file mode 100644
index 0000000..7d1bc55
--- /dev/null
+++ b/bin/processor/src/main.rs
@@ -0,0 +1,84 @@
+// This file is part of RegionX.
+//
+// RegionX is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// RegionX is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with RegionX. If not, see .
+
+use shared::{
+ config::config,
+ consumption::{delete_consumption, get_consumption, write_batch_consumption},
+ registry::registered_paras,
+};
+use std::collections::BTreeMap;
+use types::WeightConsumption;
+
+const LOG_TARGET: &str = "processor";
+
+fn main() {
+ env_logger::init();
+
+ let outputs = config().outputs;
+ let paras = registered_paras();
+
+ paras.iter().for_each(|para| {
+ let mut processed = BTreeMap::new();
+
+ log::info!(
+ target: LOG_TARGET,
+ "{}-{} - Processing consumption.",
+ para.relay_chain,
+ para.para_id,
+ );
+
+ (0..outputs).for_each(|output_index| {
+ let consumption = if let Ok(data) = get_consumption(para.clone(), Some(output_index)) {
+ data
+ } else {
+ log::error!(
+ target: LOG_TARGET,
+ "{}-{} - Failed to get consumption.",
+ para.relay_chain,
+ para.para_id,
+ );
+ vec![]
+ };
+
+ consumption.into_iter().for_each(|data| {
+ processed.entry(data.block_number).or_insert(data);
+ });
+ });
+
+ let processed: Vec = processed.values().cloned().collect();
+
+ log::info!(
+ target: LOG_TARGET,
+ "{}-{} - Writing processed consumption. Total blocks tracked: {}",
+ para.relay_chain,
+ para.para_id,
+ processed.len()
+ );
+
+ if let Err(e) = write_batch_consumption(para.clone(), processed) {
+ log::error!(
+ target: LOG_TARGET,
+ "{}-{} - Failed to write batch consumption: {:?}",
+ para.relay_chain,
+ para.para_id,
+ e,
+ );
+
+ return;
+ }
+
+ (0..outputs).for_each(|output_index| delete_consumption(para.clone(), output_index));
+ });
+}
diff --git a/bin/tracker/src/main.rs b/bin/tracker/src/main.rs
index e59a298..a4ea8e1 100644
--- a/bin/tracker/src/main.rs
+++ b/bin/tracker/src/main.rs
@@ -79,27 +79,31 @@ async fn track_weight_consumption(para: Parachain, rpc_index: usize) {
let Some(rpc) = para.rpcs.get(rpc_index) else {
log::error!(
target: LOG_TARGET,
- "Parachain {}-{} doesn't have an rpc with index: {}",
+ "{}-{} - doesn't have an rpc with index: {}",
para.relay_chain, para.para_id, rpc_index,
);
return;
};
- log::info!("Starting to track consumption for: {}-{}", para.relay_chain, para.para_id);
+ log::info!("{}-{} - Starting to track consumption.", para.relay_chain, para.para_id);
let result = OnlineClient::::from_url(rpc).await;
if let Ok(api) = result {
- if let Err(err) = track_blocks(api, para, rpc_index).await {
+ if let Err(err) = track_blocks(api, para.clone(), rpc_index).await {
log::error!(
target: LOG_TARGET,
- "Failed to track new block: {:?}",
+ "{}-{} - Failed to track new block: {:?}",
+ para.relay_chain,
+ para.para_id,
err
);
}
} else {
log::error!(
target: LOG_TARGET,
- "Failed to create online client: {:?}",
+ "{}-{} - Failed to create online client: {:?}",
+ para.relay_chain,
+ para.para_id,
result
);
}
@@ -110,7 +114,13 @@ async fn track_blocks(
para: Parachain,
rpc_index: usize,
) -> Result<(), Box> {
- log::info!("Subsciribing to finalized blocks for: {}", para.para_id);
+ log::info!(
+ target: LOG_TARGET,
+ "{}-{} - Subsciribing to finalized blocks",
+ para.relay_chain,
+ para.para_id
+ );
+
let mut blocks_sub = api
.blocks()
.subscribe_finalized()
@@ -136,7 +146,7 @@ async fn note_new_block(
let timestamp = timestamp_at(api.clone(), block.hash()).await?;
let consumption = weight_consumption(api, block_number, timestamp).await?;
- write_consumption(para, consumption, rpc_index)?;
+ write_consumption(para, consumption, Some(rpc_index))?;
Ok(())
}
diff --git a/config.toml b/config.toml
index 53ce1c6..f417f4b 100644
--- a/config.toml
+++ b/config.toml
@@ -1,6 +1,7 @@
-output_directory = "out/out"
+output_directory = "out/"
registry = "registry.json"
chaindata = "chaindata.json"
+outputs = 2
#[payment_info]
#rpc_url = "wss://rococo-rpc.polkadot.io"
diff --git a/registry.json b/registry.json
index 8379315..affe875 100644
--- a/registry.json
+++ b/registry.json
@@ -9,6 +9,60 @@
],
"para_id": 2000,
"relay_chain": "Polkadot",
- "expiry_timestamp": 1706728108
+ "expiry_timestamp": 1709740246
+ },
+ {
+ "name": "Polkadot",
+ "rpcs": [
+ "wss://rpc-polkadot.luckyfriday.io",
+ "wss://polkadot-rpc.dwellir.com"
+ ],
+ "para_id": 0,
+ "relay_chain": "Polkadot",
+ "expiry_timestamp": 1709740246
+ },
+ {
+ "name": "Moonbeam",
+ "rpcs": [
+ "wss://moonbeam-rpc.dwellir.com",
+ "wss://wss.api.moonbeam.network",
+ "wss://1rpc.io/glmr",
+ "wss://moonbeam.unitedbloc.com"
+ ],
+ "para_id": 2004,
+ "relay_chain": "Polkadot",
+ "expiry_timestamp": 1709740246
+ },
+ {
+ "name": "Astar",
+ "rpcs": [
+ "wss://astar.public.curie.radiumblock.co/ws",
+ "wss://rpc.astar.network",
+ "wss://astar-rpc.dwellir.com",
+ "wss://1rpc.io/astr"
+ ],
+ "para_id": 2006,
+ "relay_chain": "Polkadot",
+ "expiry_timestamp": 1709740246
+ },
+ {
+ "name": "HydraDX",
+ "rpcs": [
+ "wss://hydradx-rpc.dwellir.com",
+ "wss://rpc.hydradx.cloud"
+ ],
+ "para_id": 2034,
+ "relay_chain": "Polkadot",
+ "expiry_timestamp": 1709740246
+ },
+ {
+ "name": "Zeitgeist",
+ "rpcs": [
+ "wss://zeitgeist-rpc.dwellir.com",
+ "wss://main.rpc.zeitgeist.pm/ws"
+ ],
+ "para_id": 2092,
+ "relay_chain": "Polkadot",
+ "expiry_timestamp": 1709740246
}
]
\ No newline at end of file
diff --git a/routes/config.toml b/routes/config.toml
index 7b8c680..720f848 100644
--- a/routes/config.toml
+++ b/routes/config.toml
@@ -2,6 +2,7 @@ output_directory = "mock-out"
registry = "mock-parachains.json"
chaindata = "../chaindata.json"
free_mode = true
+outputs = 1
[payment_info]
rpc_url = "wss://rococo-rpc.polkadot.io"
diff --git a/routes/mock-out/out/placeholder b/routes/mock-out/out/placeholder
new file mode 100644
index 0000000..e69de29
diff --git a/routes/src/consumption.rs b/routes/src/consumption.rs
index c784359..739ac23 100644
--- a/routes/src/consumption.rs
+++ b/routes/src/consumption.rs
@@ -79,7 +79,7 @@ pub fn consumption(
let (start, end) = (start.unwrap_or_default(), end.unwrap_or(Timestamp::MAX));
// By default query the consumption that was collected from rpc index 0.
- let weight_consumptions: Vec = get_consumption(para, 0)
+ let weight_consumptions: Vec = get_consumption(para, None)
.map_err(|_| Error::ConsumptionDataNotFound)?
.into_iter()
.filter(|consumption| consumption.timestamp >= start && consumption.timestamp <= end)
diff --git a/routes/src/extend_subscription.rs b/routes/src/extend_subscription.rs
index 0acfc2c..954083e 100644
--- a/routes/src/extend_subscription.rs
+++ b/routes/src/extend_subscription.rs
@@ -40,7 +40,7 @@ pub async fn extend_subscription(data: Json) -> Result<(
log::info!(
target: LOG_TARGET,
- "Attempting to extend subscription for para: {}:{}",
+ "{}-{} - Attempting to extend subscription for para",
relay_chain, para_id
);
@@ -76,7 +76,9 @@ pub async fn extend_subscription(data: Json) -> Result<(
if let Err(err) = update_registry(paras) {
log::error!(
target: LOG_TARGET,
- "Failed to extend subscription for para: {:?}",
+ "{}-{} Failed to extend subscription for para: {:?}",
+ para.relay_chain,
+ para.para_id,
err
);
}
diff --git a/routes/src/register.rs b/routes/src/register.rs
index 4c812b1..0d035e0 100644
--- a/routes/src/register.rs
+++ b/routes/src/register.rs
@@ -44,7 +44,7 @@ pub async fn register_para(registration_data: Json) -> Result<
log::info!(
target: LOG_TARGET,
- "Attempting to register para: {}:{}",
+ "{}-{} - Attempting to register para",
relay_chain, para_id
);
@@ -71,12 +71,14 @@ pub async fn register_para(registration_data: Json) -> Result<
para.expiry_timestamp = current_timestamp() + subscription_duration;
- paras.push(para);
+ paras.push(para.clone());
if let Err(err) = update_registry(paras) {
log::error!(
target: LOG_TARGET,
- "Failed to register para: {:?}",
+ "{}-{} - Failed to register para: {:?}",
+ para.relay_chain,
+ para.para_id,
err
);
}
diff --git a/routes/tests/mock.rs b/routes/tests/mock.rs
index fb129ce..dbf8373 100644
--- a/routes/tests/mock.rs
+++ b/routes/tests/mock.rs
@@ -38,7 +38,7 @@ impl MockEnvironment {
for (para, weight_consumptions) in &mock.weight_consumptions {
weight_consumptions.iter().for_each(|consumption| {
- write_consumption(para.clone(), consumption.clone(), 0)
+ write_consumption(para.clone(), consumption.clone(), None)
.expect("Failed to write conusumption data");
});
}
diff --git a/scripts/init.sh b/scripts/init.sh
index f2b0965..fe82195 100755
--- a/scripts/init.sh
+++ b/scripts/init.sh
@@ -3,14 +3,10 @@
TRACKER_LOGS_0="logs/tracker-logs-0.out"
TRACKER_LOGS_1="logs/tracker-logs-1.out"
-WATCHDOG_LOGS_0="logs/watchdog-logs-0.out"
-WATCHDOG_LOGS_1="logs/watchdog-logs-1.out"
-
TRACKER="./target/release/tracker"
-WATCHDOG="scripts/watchdog.sh"
reset_env() {
- PIDS=$(pgrep -f "$TRACKER|$WATCHDOG")
+ PIDS=$(pgrep -f "$TRACKER")
if [ -z "$PIDS" ]; then
echo "Process not found."
@@ -27,6 +23,3 @@ reset_env
# start the tracker again
nohup sh -c 'RUST_LOG=INFO ./target/release/tracker --rpc-index 0' > $TRACKER_LOGS_0 2>&1 &
nohup sh -c 'RUST_LOG=INFO ./target/release/tracker --rpc-index 1' > $TRACKER_LOGS_1 2>&1 &
-# start the watchdog
-nohup sh -c 'scripts/watchdog.sh "$1"' _ 0 > $WATCHDOG_LOGS_0 2>&1 &
-nohup sh -c 'scripts/watchdog.sh "$1"' _ 1 > $WATCHDOG_LOGS_1 2>&1 &
diff --git a/scripts/process.sh b/scripts/process.sh
new file mode 100755
index 0000000..3c07cec
--- /dev/null
+++ b/scripts/process.sh
@@ -0,0 +1,3 @@
+#!/bin/bash
+
+sh -c 'RUST_LOG=INFO ./target/release/processor' >> logs/processor.out 2>&1
diff --git a/scripts/reset_env.sh b/scripts/reset_env.sh
old mode 100644
new mode 100755
diff --git a/shared/src/config.rs b/shared/src/config.rs
index 7b24544..6ad6bce 100644
--- a/shared/src/config.rs
+++ b/shared/src/config.rs
@@ -36,10 +36,16 @@ pub struct PaymentInfo {
#[derive(serde::Deserialize)]
pub struct Config {
+ /// Path to the root output directory.
pub output_directory: String,
+ /// Path to the registry file.
pub registry: String,
+ /// Path to the chaindata file.
pub chaindata: String,
+ /// The payment configuration.
pub payment_info: Option,
+ /// The Number of distinct output directories.
+ pub outputs: usize,
}
pub fn config() -> Config {
@@ -47,6 +53,12 @@ pub fn config() -> Config {
toml::from_str(&config_str).expect("Failed to parse config file")
}
-pub fn output_directory(rpc_index: usize) -> String {
- format!("{}-{}", config().output_directory, rpc_index)
+pub fn output_directory(rpc_index: Option) -> String {
+ let output_dir = config().output_directory.trim_end_matches('/').to_string();
+
+ if let Some(rpc_index) = rpc_index {
+ format!("{}/out-{}", output_dir, rpc_index)
+ } else {
+ format!("{}/out", output_dir)
+ }
}
diff --git a/shared/src/consumption.rs b/shared/src/consumption.rs
index c5ead88..5444a48 100644
--- a/shared/src/consumption.rs
+++ b/shared/src/consumption.rs
@@ -20,7 +20,7 @@ use types::{Parachain, WeightConsumption};
pub fn get_consumption(
para: Parachain,
- rpc_index: usize,
+ rpc_index: Option,
) -> Result, &'static str> {
let file =
File::open(output_file_path(para, rpc_index)).map_err(|_| "Consumption data not found")?;
@@ -37,11 +37,11 @@ pub fn get_consumption(
pub fn write_consumption(
para: Parachain,
consumption: WeightConsumption,
- rpc_index: usize,
+ rpc_index: Option,
) -> Result<(), std::io::Error> {
log::info!(
target: LOG_TARGET,
- "Writing weight consumption for Para {}-{} for block: #{}",
+ "{}-{} - Writing weight consumption for block: #{}",
para.relay_chain, para.para_id, consumption.block_number
);
@@ -51,24 +51,61 @@ pub fn write_consumption(
let mut wtr = WriterBuilder::new().from_writer(file);
// The data is stored in the sequence described at the beginning of the file.
- wtr.write_record(&[
- // Block number:
- consumption.block_number.to_string(),
- // Timestamp:
- consumption.timestamp.to_string(),
- // Reftime consumption:
- consumption.ref_time.normal.to_string(),
- consumption.ref_time.operational.to_string(),
- consumption.ref_time.mandatory.to_string(),
- // Proof size:
- consumption.proof_size.normal.to_string(),
- consumption.proof_size.operational.to_string(),
- consumption.proof_size.mandatory.to_string(),
- ])?;
+ wtr.write_record(&consumption.to_csv())?;
wtr.flush()
}
-fn output_file_path(para: Parachain, rpc_index: usize) -> String {
+pub fn write_batch_consumption(
+ para: Parachain,
+ consumption: Vec,
+) -> Result<(), std::io::Error> {
+ log::info!(
+ target: LOG_TARGET,
+ "{}-{} - Writing batch weight consumption.",
+ para.relay_chain, para.para_id
+ );
+
+ let output_file_path = output_file_path(para, None);
+ let file = OpenOptions::new().create(true).append(true).open(output_file_path)?;
+
+ let mut wtr = WriterBuilder::new().from_writer(file);
+
+ // TODO: add a to_csv function
+ consumption.iter().try_for_each(|entry| {
+ // The data is stored in the sequence described at the beginning of the file.
+ wtr.write_record(&entry.to_csv())
+ })?;
+
+ wtr.flush()
+}
+
+pub fn delete_consumption(para: Parachain, rpc_index: usize) {
+ log::info!(
+ target: LOG_TARGET,
+ "{}-{} - Deleting weight consumption.",
+ para.relay_chain, para.para_id
+ );
+
+ let output_file_path = output_file_path(para, Some(rpc_index));
+ match std::fs::remove_file(output_file_path.clone()) {
+ Ok(_) => {
+ log::info!(
+ target: LOG_TARGET,
+ "{} Deleted successfully",
+ output_file_path
+ );
+ },
+ Err(e) => {
+ log::error!(
+ target: LOG_TARGET,
+ "{} Failed to delete: {:?}",
+ output_file_path, e
+ );
+ },
+ }
+}
+
+fn output_file_path(para: Parachain, rpc_index: Option) -> String {
format!("{}/{}-{}.csv", output_directory(rpc_index), para.relay_chain, para.para_id)
}
diff --git a/shared/src/lib.rs b/shared/src/lib.rs
index 034c19a..c13c6ed 100644
--- a/shared/src/lib.rs
+++ b/shared/src/lib.rs
@@ -46,7 +46,7 @@ pub fn reset_mock_environment() {
// Reset the registered paras file:
let _registry = registry::init_registry();
- let output_path = output_directory(0);
+ let output_path = output_directory(None);
// Remove the output files:
let _ = std::fs::create_dir(output_path.clone());
diff --git a/types/src/lib.rs b/types/src/lib.rs
index 6d040e4..2dc6cee 100644
--- a/types/src/lib.rs
+++ b/types/src/lib.rs
@@ -126,3 +126,25 @@ impl fmt::Display for WeightConsumption {
Ok(())
}
}
+
+impl WeightConsumption {
+ /// Returns consumption data as a vector of strings, where each element
+ /// represents a column in a CSV format. Each string in the vector corresponds
+ /// to one column of data.
+ pub fn to_csv(&self) -> Vec {
+ vec![
+ // Block number:
+ self.block_number.to_string(),
+ // Timestamp:
+ self.timestamp.to_string(),
+ // Reftime consumption:
+ self.ref_time.normal.to_string(),
+ self.ref_time.operational.to_string(),
+ self.ref_time.mandatory.to_string(),
+ // Proof size:
+ self.proof_size.normal.to_string(),
+ self.proof_size.operational.to_string(),
+ self.proof_size.mandatory.to_string(),
+ ]
+ }
+}