Skip to content

Commit

Permalink
Consumption processing (#29)
Browse files Browse the repository at this point in the history
* Consumption processing

* fix tests

* process

* fix

* make clippy happy
  • Loading branch information
Szegoo authored Feb 10, 2024
1 parent 003d695 commit d1e3eb9
Show file tree
Hide file tree
Showing 21 changed files with 306 additions and 45 deletions.
15 changes: 15 additions & 0 deletions .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,18 @@ jobs:

- name: Ensure the project builds
run: cargo build
test:
needs: install
runs-on: ubuntu-latest
steps:
- name: Use cashed cargo
uses: actions/cache@v3
with:
path: ~/.cargo
key: ${{ runner.os }}-rust-${{ hashFiles('rust-toolchain.toml') }}

- name: Checkout the source code
uses: actions/checkout@v3

- name: Ensure the project builds
run: cargo test -- --test-threads=1
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ license = "GPL-3.0-only"

[workspace]
members = [
"bin/processor",
"bin/server",
"bin/tracker",
"routes",
Expand Down
13 changes: 13 additions & 0 deletions bin/processor/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[package]
name = "processor"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
log = "0.4"
shared = { path = "../../shared" }
env_logger = "0.10.1"
polkadot-core-primitives = { git = "https://github.com/paritytech/polkadot-sdk", branch = "release-polkadot-v1.1.0" }
types = { path = "../../types" }
84 changes: 84 additions & 0 deletions bin/processor/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// This file is part of RegionX.
//
// RegionX is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// RegionX is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with RegionX. If not, see <https://www.gnu.org/licenses/>.

use shared::{
config::config,
consumption::{delete_consumption, get_consumption, write_batch_consumption},
registry::registered_paras,
};
use std::collections::BTreeMap;
use types::WeightConsumption;

const LOG_TARGET: &str = "processor";

fn main() {
env_logger::init();

let outputs = config().outputs;
let paras = registered_paras();

paras.iter().for_each(|para| {
let mut processed = BTreeMap::new();

log::info!(
target: LOG_TARGET,
"{}-{} - Processing consumption.",
para.relay_chain,
para.para_id,
);

(0..outputs).for_each(|output_index| {
let consumption = if let Ok(data) = get_consumption(para.clone(), Some(output_index)) {
data
} else {
log::error!(
target: LOG_TARGET,
"{}-{} - Failed to get consumption.",
para.relay_chain,
para.para_id,
);
vec![]
};

consumption.into_iter().for_each(|data| {
processed.entry(data.block_number).or_insert(data);
});
});

let processed: Vec<WeightConsumption> = processed.values().cloned().collect();

log::info!(
target: LOG_TARGET,
"{}-{} - Writing processed consumption. Total blocks tracked: {}",
para.relay_chain,
para.para_id,
processed.len()
);

if let Err(e) = write_batch_consumption(para.clone(), processed) {
log::error!(
target: LOG_TARGET,
"{}-{} - Failed to write batch consumption: {:?}",
para.relay_chain,
para.para_id,
e,
);

return;
}

(0..outputs).for_each(|output_index| delete_consumption(para.clone(), output_index));
});
}
24 changes: 17 additions & 7 deletions bin/tracker/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,27 +79,31 @@ async fn track_weight_consumption(para: Parachain, rpc_index: usize) {
let Some(rpc) = para.rpcs.get(rpc_index) else {
log::error!(
target: LOG_TARGET,
"Parachain {}-{} doesn't have an rpc with index: {}",
"{}-{} - doesn't have an rpc with index: {}",
para.relay_chain, para.para_id, rpc_index,
);
return;
};

log::info!("Starting to track consumption for: {}-{}", para.relay_chain, para.para_id);
log::info!("{}-{} - Starting to track consumption.", para.relay_chain, para.para_id);
let result = OnlineClient::<PolkadotConfig>::from_url(rpc).await;

if let Ok(api) = result {
if let Err(err) = track_blocks(api, para, rpc_index).await {
if let Err(err) = track_blocks(api, para.clone(), rpc_index).await {
log::error!(
target: LOG_TARGET,
"Failed to track new block: {:?}",
"{}-{} - Failed to track new block: {:?}",
para.relay_chain,
para.para_id,
err
);
}
} else {
log::error!(
target: LOG_TARGET,
"Failed to create online client: {:?}",
"{}-{} - Failed to create online client: {:?}",
para.relay_chain,
para.para_id,
result
);
}
Expand All @@ -110,7 +114,13 @@ async fn track_blocks(
para: Parachain,
rpc_index: usize,
) -> Result<(), Box<dyn std::error::Error>> {
log::info!("Subsciribing to finalized blocks for: {}", para.para_id);
log::info!(
target: LOG_TARGET,
"{}-{} - Subsciribing to finalized blocks",
para.relay_chain,
para.para_id
);

let mut blocks_sub = api
.blocks()
.subscribe_finalized()
Expand All @@ -136,7 +146,7 @@ async fn note_new_block(
let timestamp = timestamp_at(api.clone(), block.hash()).await?;
let consumption = weight_consumption(api, block_number, timestamp).await?;

write_consumption(para, consumption, rpc_index)?;
write_consumption(para, consumption, Some(rpc_index))?;

Ok(())
}
Expand Down
3 changes: 2 additions & 1 deletion config.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
output_directory = "out/out"
output_directory = "out/"
registry = "registry.json"
chaindata = "chaindata.json"
outputs = 2

#[payment_info]
#rpc_url = "wss://rococo-rpc.polkadot.io"
Expand Down
56 changes: 55 additions & 1 deletion registry.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,60 @@
],
"para_id": 2000,
"relay_chain": "Polkadot",
"expiry_timestamp": 1706728108
"expiry_timestamp": 1709740246
},
{
"name": "Polkadot",
"rpcs": [
"wss://rpc-polkadot.luckyfriday.io",
"wss://polkadot-rpc.dwellir.com"
],
"para_id": 0,
"relay_chain": "Polkadot",
"expiry_timestamp": 1709740246
},
{
"name": "Moonbeam",
"rpcs": [
"wss://moonbeam-rpc.dwellir.com",
"wss://wss.api.moonbeam.network",
"wss://1rpc.io/glmr",
"wss://moonbeam.unitedbloc.com"
],
"para_id": 2004,
"relay_chain": "Polkadot",
"expiry_timestamp": 1709740246
},
{
"name": "Astar",
"rpcs": [
"wss://astar.public.curie.radiumblock.co/ws",
"wss://rpc.astar.network",
"wss://astar-rpc.dwellir.com",
"wss://1rpc.io/astr"
],
"para_id": 2006,
"relay_chain": "Polkadot",
"expiry_timestamp": 1709740246
},
{
"name": "HydraDX",
"rpcs": [
"wss://hydradx-rpc.dwellir.com",
"wss://rpc.hydradx.cloud"
],
"para_id": 2034,
"relay_chain": "Polkadot",
"expiry_timestamp": 1709740246
},
{
"name": "Zeitgeist",
"rpcs": [
"wss://zeitgeist-rpc.dwellir.com",
"wss://main.rpc.zeitgeist.pm/ws"
],
"para_id": 2092,
"relay_chain": "Polkadot",
"expiry_timestamp": 1709740246
}
]
1 change: 1 addition & 0 deletions routes/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ output_directory = "mock-out"
registry = "mock-parachains.json"
chaindata = "../chaindata.json"
free_mode = true
outputs = 1

[payment_info]
rpc_url = "wss://rococo-rpc.polkadot.io"
Expand Down
Empty file added routes/mock-out/out/placeholder
Empty file.
2 changes: 1 addition & 1 deletion routes/src/consumption.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ pub fn consumption(
let (start, end) = (start.unwrap_or_default(), end.unwrap_or(Timestamp::MAX));

// By default query the consumption that was collected from rpc index 0.
let weight_consumptions: Vec<WeightConsumption> = get_consumption(para, 0)
let weight_consumptions: Vec<WeightConsumption> = get_consumption(para, None)
.map_err(|_| Error::ConsumptionDataNotFound)?
.into_iter()
.filter(|consumption| consumption.timestamp >= start && consumption.timestamp <= end)
Expand Down
6 changes: 4 additions & 2 deletions routes/src/extend_subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub async fn extend_subscription(data: Json<ExtendSubscriptionData>) -> Result<(

log::info!(
target: LOG_TARGET,
"Attempting to extend subscription for para: {}:{}",
"{}-{} - Attempting to extend subscription for para",
relay_chain, para_id
);

Expand Down Expand Up @@ -76,7 +76,9 @@ pub async fn extend_subscription(data: Json<ExtendSubscriptionData>) -> Result<(
if let Err(err) = update_registry(paras) {
log::error!(
target: LOG_TARGET,
"Failed to extend subscription for para: {:?}",
"{}-{} Failed to extend subscription for para: {:?}",
para.relay_chain,
para.para_id,
err
);
}
Expand Down
8 changes: 5 additions & 3 deletions routes/src/register.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub async fn register_para(registration_data: Json<RegistrationData>) -> Result<

log::info!(
target: LOG_TARGET,
"Attempting to register para: {}:{}",
"{}-{} - Attempting to register para",
relay_chain, para_id
);

Expand All @@ -71,12 +71,14 @@ pub async fn register_para(registration_data: Json<RegistrationData>) -> Result<

para.expiry_timestamp = current_timestamp() + subscription_duration;

paras.push(para);
paras.push(para.clone());

if let Err(err) = update_registry(paras) {
log::error!(
target: LOG_TARGET,
"Failed to register para: {:?}",
"{}-{} - Failed to register para: {:?}",
para.relay_chain,
para.para_id,
err
);
}
Expand Down
2 changes: 1 addition & 1 deletion routes/tests/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl MockEnvironment {

for (para, weight_consumptions) in &mock.weight_consumptions {
weight_consumptions.iter().for_each(|consumption| {
write_consumption(para.clone(), consumption.clone(), 0)
write_consumption(para.clone(), consumption.clone(), None)
.expect("Failed to write conusumption data");
});
}
Expand Down
9 changes: 1 addition & 8 deletions scripts/init.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,10 @@
TRACKER_LOGS_0="logs/tracker-logs-0.out"
TRACKER_LOGS_1="logs/tracker-logs-1.out"

WATCHDOG_LOGS_0="logs/watchdog-logs-0.out"
WATCHDOG_LOGS_1="logs/watchdog-logs-1.out"

TRACKER="./target/release/tracker"
WATCHDOG="scripts/watchdog.sh"

reset_env() {
PIDS=$(pgrep -f "$TRACKER|$WATCHDOG")
PIDS=$(pgrep -f "$TRACKER")

if [ -z "$PIDS" ]; then
echo "Process not found."
Expand All @@ -27,6 +23,3 @@ reset_env
# start the tracker again
nohup sh -c 'RUST_LOG=INFO ./target/release/tracker --rpc-index 0' > $TRACKER_LOGS_0 2>&1 &
nohup sh -c 'RUST_LOG=INFO ./target/release/tracker --rpc-index 1' > $TRACKER_LOGS_1 2>&1 &
# start the watchdog
nohup sh -c 'scripts/watchdog.sh "$1"' _ 0 > $WATCHDOG_LOGS_0 2>&1 &
nohup sh -c 'scripts/watchdog.sh "$1"' _ 1 > $WATCHDOG_LOGS_1 2>&1 &
3 changes: 3 additions & 0 deletions scripts/process.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#!/bin/bash

sh -c 'RUST_LOG=INFO ./target/release/processor' >> logs/processor.out 2>&1
Empty file modified scripts/reset_env.sh
100644 → 100755
Empty file.
Loading

0 comments on commit d1e3eb9

Please sign in to comment.